from kafka import KafkaAdminClient
from kafka.admin import NewTopic

from air_web.config.config import config


def create_topic(topic_name):
    client = KafkaAdminClient(bootstrap_servers=','.join(config['KAFKA_HOST']))
    topics = client.list_topics()
    if topic_name not in topics:
        # 创建 topic
        new_topic = NewTopic(name=topic_name, num_partitions=3, replication_factor=2)
        client.create_topics([new_topic])
        print(f"创建topic:{topic_name}")
